package com.broker.utils.events.support.kafka;

import com.broker.utils.events.TopicTag;
import com.broker.utils.events.support.IMQConsumer;
import com.broker.utils.events.support.MQConfig;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;

import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;

@Slf4j
public class BrokerKafkaConsumer implements IMQConsumer {
    private final MQConfig kafkaConfig;
    private KafkaConsumer<String, String> consumer = null;
    private Map<TopicTag, List<BiConsumer<String, String>>> topicConsumers = new HashMap<>();
    private ExecutorService singleThreadExecutor;
    private Runnable task;
    private boolean taskExit = false;
    public BrokerKafkaConsumer(MQConfig kafkaConfig){
        this.kafkaConfig = kafkaConfig;
        init();
    }

    private void init(){
        Properties props = new Properties();
        props.put("bootstrap.servers", kafkaConfig.getMqServerUrl());
        props.put("group.id", "test");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        this.consumer = new KafkaConsumer<>(props);
        this.singleThreadExecutor = Executors.newSingleThreadExecutor();
        this.task = ()->{
            while (!taskExit) {
                ConsumerRecords<String, String> records = consumer.poll(0);
                for (ConsumerRecord<String, String> record : records){
                    String topic = record.topic();
                    String tag = record.key();
                    String msg = record.value();
                    List<BiConsumer<String, String>> consumers = topicConsumers.getOrDefault(new TopicTag().setTopic(topic).setTag(tag), new ArrayList<>());
                    if(!consumers.isEmpty()){
                        log.debug("Kafka: consume message offset:" + record.offset() +"   TOPIC:" + topic +"   OBJECT:" + tag);
                        consumers.forEach(c->{
                            c.accept(tag, msg);
                        });
                    }
                }
            }
        };
    }
    // 添加订阅
    public void appendSubscribe(String topic, String tag, BiConsumer<String, String> consumer){
        List<BiConsumer<String, String>> consumers = topicConsumers.getOrDefault(new TopicTag().setTopic(topic).setTag(tag), new ArrayList<>());
        consumers.add(consumer);
        topicConsumers.put(new TopicTag().setTopic(topic).setTag(tag), consumers);
    }

    public void start(String topic){
        AtomicBoolean consumerFinished = new AtomicBoolean(false);
        consumer.subscribe(Arrays.asList(topic), new ConsumerRebalanceListener() {
            public void onPartitionsRevoked(Collection<TopicPartition> collection) {
            }
            public void onPartitionsAssigned(Collection<TopicPartition> collection) {
                //将偏移设置到最开始
//                consumer.seekToBeginning(collection);
                //将偏移设置到最后
                consumer.seekToEnd(collection);
                consumerFinished.set(true);
            }
        });
        if(this.singleThreadExecutor != null){
            this.singleThreadExecutor.execute(task);
        }
        while (!consumerFinished.get());
    }

    public void stop(){
        taskExit = true;
        if(singleThreadExecutor != null){
            singleThreadExecutor.shutdown();
        }
        this.consumer.close();
    }

}
